#include "config.h"

#include <sys/mman.h>
#include <libaio.h>
#include <errno.h>
#include <sys/eventfd.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "cluster.h"
#include "fileinfo.h"
#include "squeue.h"
#include "bh.h"
#include "timer.h"
#include "disk.h"
#include "core.h"
#include "mem_cache.h"
#include "main_loop.h"
#include "net_global.h"
#include "bmap.h"
#include "dbg.h"
#include "../../storage/controller/ramdisk.h"
#include "bit_lock.h"
#include "ypage.h"
/*
  ----------------------------------------------------------------------------------
  | replica_srv meta (4k) | chunk info (4k) | chunk data |
  ----------------------------------------------------------------------------------

  replica_srv mata : chkmeta_t, 4K align
  chunk info : chkinfo_t 4K align
  chunk data : opaque data 4M / 16M / 32M/ 64M

*/

typedef replica_srv_entry_t entry_t;

STATIC int __replica_write_check(entry_t *ent, const nid_t *writer,
                                 const io_t *io, uint32_t magic, int retry)
{
        int ret;

        if (unlikely((io->lease != -1) && (io->lease != ent->token.seq))) {
                DWARN("chunk "CHKID_FORMAT" owner %s:%s, lease %x:%x\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(writer), ent->token.seq, io->lease);
                ret = EPERM;
                GOTO(err_ret, ret);
        }
        
        if (unlikely(nid_cmp(&ent->owner, writer))) {
                DWARN("chunk "CHKID_FORMAT" owner %s:%s, magic %x:%x, clock %ju:%ju\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(writer), ent->magic, magic,
                      ent->vclock.clock, io->vclock.clock);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        if (unlikely(ent->magic != magic)) {
                DWARN("chunk "CHKID_FORMAT", magic %x:%x, clock %ju:%ju\n",
                      CHKID_ARG(&ent->chkid), ent->magic, magic,
                      ent->vclock.clock, io->vclock.clock);
                ret = EPERM;
                GOTO(err_ret, ret);
        }
        
        if (unlikely(io->vclock.clock <= ent->vclock.clock)) {
                // TODO 会导致replica_rpc.c:369 assert
                // 或与chunk内并发有关
                DWARN("chunk "CHKID_FORMAT" owner %s:%s, magic %x:%x, clock %ju:%ju\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(writer), ent->magic, magic,
                      ent->vclock.clock, io->vclock.clock);
                ret = EINVAL;
                GOTO(err_ret, ret);
        } else if (unlikely(io->vclock.clock > ent->vclock.clock + 1)) {
                if (retry > REPLICA_MAX_RETRY / 2) {
                        DBUG("chunk "CHKID_FORMAT" op %ju --> %ju\n",
                              CHKID_ARG(&ent->chkid), io->vclock.clock, ent->vclock.clock);
                }

                ret = EAGAIN;
                goto err_ret;
        }

        YASSERT(io->vclock.clock == ent->vclock.clock + 1);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_write_clock(entry_t *ent, const vclock_t *vclock, int dirty)
{
        int ret, retry = 0;

retry:
        ret = clock_set(&ent->chkid, vclock, dirty);
        if (unlikely(ret)) {
                if (ret == EBUSY && retry < 10) {
                        DERROR("set "CHKID_FORMAT" clock %ju busy retry %u\n",
                              CHKID_ARG(&ent->chkid), vclock->clock, retry);

                        schedule_sleep("clock busy", 1000);
                        retry ++;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC void __replica_read_resume_clock(entry_t *ent, const vclock_t *vclock)
{
        int count;
        wlist_t *list;
        struct list_head *pos, *n;

        count = 0;
        list_for_each_safe(pos, n, &ent->wq.wlist) {
                list = (void *)pos;

                if (list->op == __OP_READ_WAIT__) {
                        if (vclock->clock >= list->vclock.clock) {
                                DBUG("read resume clock %ju %ju\n", vclock->clock, list->vclock.clock);
                                list_del_init(&list->hook);
                                schedule_resume(&list->task, 0, NULL);
                        }
                }

                count++;
        }

        if (unlikely(count > 100)) {
                DWARN("chunk "CHKID_FORMAT" wait %u\n", CHKID_ARG(&ent->chkid), count);
        }
}

STATIC void __replica_write_resume_clock(entry_t *ent, const vclock_t *vclock)
{
        wlist_t *list;
        struct list_head *pos, *n;

        list_for_each_safe(pos, n, &ent->wq.wlist) {
                list = (void *)pos;

                if (list->op == __OP_WRITE_WAIT__) {
                        if (vclock->clock + 1 == list->vclock.clock) {
                                list_del_init(&list->hook);
                                schedule_resume(&list->task, 0, NULL);
                                break;
                        } else {
                                YASSERT(list->vclock.clock > ent->vclock.clock);
                        }
                }
        }
}

STATIC void __replica_resume(entry_t *ent, const vclock_t *vclock)
{
        int ret;
        
        ret = sy_spin_lock(&ent->wq.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        __replica_write_resume_clock(ent, vclock);
        __replica_read_resume_clock(ent, vclock);
        
        sy_spin_unlock(&ent->wq.lock);

}

STATIC int __replica_writethrough(const chkid_t *chkid, const diskloc_t *loc,
                                  const buffer_t *buf, int offset, int prio)
{
        int ret;

        ret = diskmd_aio_write(chkid, loc, buf, offset, prio);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


STATIC int __replica_write_aio__(const char *pool, const io_t *io, const buffer_t *buf, const diskloc_t *loc)
{
        int ret, prio;
        buffer_t *tmp;
        buffer_t align;

        (void) pool;

        if (likely(gloconf.rdma || buf == NULL || mbuffer_isalign(buf))) {
                tmp = (buffer_t *)buf;
        } else {
                mbuffer_clone(&align, buf);
                tmp = &align;
        }

        prio = io->id.type == __RAW_CHUNK__ ? 0 : 1;

        DBUG("write "CHKID_FORMAT" prio %u\n", CHKID_ARG(&io->id), prio);

        ret = __replica_writethrough(&io->id, loc, tmp, io->offset, prio);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

#if RAMDISK_ENABLE
        ret = ramdisk_write(&io->id, buf, io->size, io->offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DWARN("replica write "CHKID_FORMAT" offset:%ld size:%d clock:%ld loc disk %d idx %d\n",
                        CHKID_ARG(&io->id), io->offset, io->size, io->vclock.clock, loc->diskid, loc->idx);
#endif

#if ISCSI_IO_RECORD
        char mem[MAX_INFO_LEN];

        sprintf(mem, "replica_write_aio "CHKID_FORMAT" (%llu, %llu)",
                        CHKID_ARG(&io->id),
                        (LLU)io->offset, (LLU)io->size);

        mbuffer_dump(buf, 8, mem);
#endif

        if (unlikely(tmp == &align)) {
                mbuffer_free(&align);
        }

        return 0;
err_ret:
        if (unlikely(tmp == &align)) {
                mbuffer_free(&align);
        }
        return ret;
}

#if 0
STATIC void __replica_write_cancel_clock(entry_t *ent, const nid_t *writer, int retval)
{
        wlist_t *wlist;
        struct list_head *pos, *n;

        network_close(writer, "cancel write fail", NULL);

        list_for_each_safe(pos, n, &ent->wlist) {
                wlist = (void *)pos;

                if (wlist->op == __OP_READ_WAIT__ || wlist->op == __OP_WRITE_WAIT__) {
                        list_del_init(&wlist->hook);
                        schedule_resume(&wlist->task, retval, NULL);
                }
        }
}
#endif

#if ENABLE_CHUNK_PARALLEL

static inline void __replica_bits_lock(const io_t *io, entry_t *ent)
{
        if(unlikely(ent->pio.bits_lock)) {
                int i;
                uint64_t off_new;
                uint32_t len_new;

                range_align(REPLICA_DISK_MD_PAGE_SIZE, io->offset, (uint32_t)io->size, &off_new, &len_new);

                for(i=0;i<len_new / REPLICA_DISK_MD_PAGE_SIZE;i++)
                        bits_lock_wlock(ent->pio.bits_lock, off_new / REPLICA_DISK_MD_PAGE_SIZE + i);
        }
}

static inline void __replica_bits_unlock(const io_t *io, entry_t *ent)
{
        if(unlikely(ent->pio.bits_lock)) {
                int i;
                uint64_t off_new;
                uint32_t len_new;

                range_align(REPLICA_DISK_MD_PAGE_SIZE, io->offset, (uint32_t)io->size, &off_new, &len_new);

                for(i=0;i<len_new / REPLICA_DISK_MD_PAGE_SIZE;i++)
                        bits_lock_unlock(ent->pio.bits_lock, off_new / REPLICA_DISK_MD_PAGE_SIZE + i);
        }
}

static int __replica_write_aio0(mcache_entry_t *cent, const nid_t *writer, const io_t *io,
                               const buffer_t *buf, uint32_t magic, int retry)
{
        int ret;
        entry_t *ent;
        
        ent = cent->value;

#if 1
        if (unlikely(nid_cmp(&ent->owner, writer))) {
                DWARN("chunk "CHKID_FORMAT" owner %s:%s, magic %x:%x, clock %ju:%ju\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(writer), ent->magic, magic,
                      ent->vclock.clock, io->vclock.clock);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        if (unlikely(ent->magic != magic)) {
                DWARN("chunk "CHKID_FORMAT", magic %x:%x, clock %ju:%ju\n",
                      CHKID_ARG(&ent->chkid), ent->magic, magic,
                      ent->vclock.clock, io->vclock.clock);
                ret = EPERM;
                GOTO(err_ret, ret);
        }
#endif
        
        ret = sy_spin_lock(&ent->pio.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(io->vclock.clock <= ent->vclock.clock)) {
                DWARN("chunk "CHKID_FORMAT" owner %s:%s, magic %x:%x, clock %ju:%ju\n",
                      CHKID_ARG(&ent->chkid), network_rname(&ent->owner),
                      network_rname(writer), ent->magic, magic,
                      ent->vclock.clock, io->vclock.clock);
                ret = EINVAL;
                GOTO(err_lock, ret);
        } else if (unlikely(io->vclock.clock > ent->vclock.clock + 1)) {
                if (retry > REPLICA_MAX_RETRY / 2) {
                        DBUG("chunk "CHKID_FORMAT" op %ju --> %ju\n",
                              CHKID_ARG(&ent->chkid), io->vclock.clock, ent->vclock.clock);
                }

                ret = EAGAIN;
                goto err_lock;
        }

        YASSERT(io->vclock.clock == ent->vclock.clock + 1);
        
        ent->vclock = io->vclock;
        ent->pio.writing++;
        YASSERT(ent->pio.writing < 1024);

        sy_spin_unlock(&ent->pio.lock);

        ret = __replica_write_clock(ent, &io->vclock, 1);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
                
        return 0;
err_lock:
        sy_spin_unlock(&ent->pio.lock);
err_ret:
        return ret;
}

static int __replica_write_aio1(mcache_entry_t *cent, const io_t *io, const buffer_t *buf)
{
        int ret;
        entry_t *ent;
        diskloc_t loc;
        
        ent = cent->value;
        loc = ent->loc;

        DBUG("lock "CHKID_FORMAT", clock %llu\n", CHKID_ARG(&io->id), (LLU)io->vclock.clock);
        
        __replica_bits_lock(io, ent);

        ret = __replica_write_aio__(ent->pool, io, buf, &loc);
        if (unlikely(ret)) {
                // TODO EIO
                DWARN("pool %s disk %u %u ret %d\n", ent->pool, loc.diskid, loc.idx, ret);
                // UNIMPLEMENTED(__DUMP__);
                GOTO(err_ret, ret);
        }

        __replica_bits_unlock(io, ent);

        __replica_resume(ent, &io->vclock);

        return 0;
err_ret:
        __replica_bits_unlock(io, ent);
        return ret;
}

static int __replica_write_aio2(mcache_entry_t *cent, const io_t *io)
{
        int ret;
        entry_t *ent;
        
        ent = cent->value;
        ret = sy_spin_lock(&ent->pio.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent->pio.writing--;

        sy_spin_unlock(&ent->pio.lock);

        if (ent->pio.writing == 0) {
                ret = __replica_write_clock(ent, &io->vclock, 0);
                if (unlikely(ret)) {
                        UNIMPLEMENTED(__DUMP__);
                        GOTO(err_ret, ret);
                }
        }
        
        return 0;
err_ret:
        return ret;
}

STATIC int __replica_write_aio(const nid_t *writer, const io_t *io,
                               const buffer_t *buf, uint32_t magic, int retry)
{
        int ret;
        mcache_entry_t *cent;

        ret = replica_srv_get(&io->id, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __replica_write_aio0(cent, writer, io, buf, magic, retry);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = __replica_write_aio1(cent, io, buf);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        mcache_unlock(cent);
  
        ret = mcache_wrlock(cent);
        if (unlikely(ret)) {
                DERROR("lock timeout\n");
                EXIT(EAGAIN);
                GOTO(err_release, ret);
        }

        ret = __replica_write_aio2(cent, io);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        mcache_unlock(cent);
        replica_srv_release(cent);

        return 0;
err_lock:
        mcache_unlock(cent);
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}

#else

STATIC int __replica_write_aio(const nid_t *writer, const io_t *io,
                               const buffer_t *buf, uint32_t magic, int retry)
{
        int ret;
        mcache_entry_t *cent;
        diskloc_t loc;
        entry_t *ent;

        ret = replica_srv_get(&io->id, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;
        ret = __replica_write_check(cent->value, writer, io, magic, retry);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        goto err_lock;
                } else {
                        GOTO(err_lock, ret);
                }
        }

        /**
         * 在controller上维护每个io的clock，并传播到各副本
         * 每个io操作前，会检查(@see __table2_chunk_check)各副本的一致性，
         * 如不一致，尝试执行修复过程
         *
         * 在每个副本处，通过clock和dirty跟踪每个副本的状态
         * dirty用于保证事务的原子性
         */
        ret = __replica_write_clock(ent, &io->vclock, 1);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        loc = ent->loc;
        ent->vclock = io->vclock;

        ret = __replica_write_aio__(ent->pool, io, buf, &loc);
        if (unlikely(ret)) {
                ent->magic = 0;
                GOTO(err_lock, ret);
        }

        DBUG("lock "CHKID_FORMAT", clock %ju\n", CHKID_ARG(&io->id), io->vclock.clock);

        ret = __replica_write_clock(ent, &io->vclock, 0);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
                GOTO(err_lock, ret);
        }

        __replica_resume(ent, &io->vclock);
        
        mcache_unlock(cent);
        replica_srv_release(cent);

        return 0;
//err_cancel:
        //__replica_write_cancel_clock(cent->value, writer, ret);
err_lock:
        mcache_unlock(cent);
err_release:
        replica_srv_release(cent);
err_ret:
        return ret;
}

#endif

typedef struct {
        task_t task;
        vclock_t vclock;
        uint32_t magic;
        time_t ltime;
        chkid_t chkid;
        nid_t writer;
} arg_t;

STATIC void __replica_write_wait_check__(void *_arg)
{
        int ret;
        mcache_entry_t *cent;
        arg_t *arg = _arg;
        chkid_t *chkid = &arg->chkid;
        wlist_t *wlist;
        time_t ltime;
        struct list_head *pos, *n;
        entry_t *ent;

        DINFO("core[%d][%d] write "CHKID_FORMAT" clock %ju check\n",
              arg->task.scheduleid, arg->task.taskid,
              CHKID_ARG(chkid), arg->vclock.clock);
        ret = replica_srv_get(chkid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ent = cent->value;

        ret = sy_spin_lock(&ent->wq.lock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        list_for_each_safe(pos, n, &ent->wq.wlist) {
                wlist = (void *)pos;
                if (arg->vclock.clock == wlist->vclock.clock
                    && __OP_WRITE_WAIT__ == wlist->op
                    && arg->magic == wlist->magic
                    && arg->ltime == wlist->ltime) {
                         
                        ret = network_connect(&arg->writer, &ltime, 1, 0);
                        if (ret || ltime != arg->ltime) {
                                DERROR("core[%d][%d] write "CHKID_FORMAT" clock %ju ret %d reset, wait %u\n",
                                       wlist->task.scheduleid, wlist->task.taskid,
                                       CHKID_ARG(chkid), arg->vclock.clock, ret, (int)(gettime() - wlist->begin));
                                list_del_init(&wlist->hook);
                                schedule_resume(&wlist->task, ETIME, NULL);
                                break;
                        }

                        if (gettime() - wlist->begin > gloconf.lease_timeout / 2) {
                                DERROR("core[%d][%d] write "CHKID_FORMAT" clock %ju begin %ld timeout\n",
                                                wlist->task.scheduleid, wlist->task.taskid,
                                                CHKID_ARG(chkid), arg->vclock.clock, wlist->begin);
                                list_del_init(&wlist->hook);
                                schedule_resume(&wlist->task, ETIME, NULL);
                                break;
                        }
                }
        }

        sy_spin_unlock(&ent->wq.lock);
        
        mcache_unlock(cent);
        replica_srv_release(cent);
        mem_cache_free(MEM_CACHE_4K, _arg);
        
        return;
err_release:
        replica_srv_release(cent);
err_ret:
        mem_cache_free(MEM_CACHE_4K, _arg);
        return;
}

STATIC void __replica_write_wait_check(void *_args)
{
        wlist_t *wlist = _args;
        arg_t *arg = mem_cache_calloc1(MEM_CACHE_4K, 0);

        arg->task = wlist->task;
        arg->chkid = wlist->chkid;
        arg->ltime = wlist->ltime;
        arg->magic = wlist->magic;
        arg->writer = wlist->writer;
        arg->vclock.clock = wlist->vclock.clock;

        schedule_task_new("replica_wait_check", __replica_write_wait_check__, arg, -1);
}

STATIC int __replica_write_wait(const nid_t *writer, const io_t *io, int retry,
                                uint32_t magic, time_t ltime)
{
        int ret, wait;
        wlist_t wlist;
        mcache_entry_t *cent;
        entry_t *ent;

        ret = replica_srv_get(&io->id, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ent = cent->value;

        if (io->vclock.clock > ent->vclock.clock + 1) {
                DBUG("write "CHKID_FORMAT" clock %ju %ju\n",
                     CHKID_ARG(&io->id), io->vclock.clock, ent->vclock.clock);

                wlist.vclock = io->vclock;
                wlist.chkid = io->id;
                wlist.writer = *writer;
                wlist.op = __OP_WRITE_WAIT__;
                wlist.magic = magic;
                wlist.ltime = ltime;
                wlist.begin = gettime();
                wlist.task = schedule_task_get();

                __replica_srv_wqadd(ent, &wlist);
                wait = 1;
        } else {
                wait = 0;
        }

        mcache_unlock(cent);
        replica_srv_release(cent);

        if (wait) {
                DBUG("write "CHKID_FORMAT" clock %ju wait, retry %u\n",
                     CHKID_ARG(&io->id), io->vclock.clock, retry);

                ret = schedule_yield1("write_wait_clock", NULL, &wlist,
                                      __replica_write_wait_check, 2);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
                
                DBUG("write "CHKID_FORMAT" clock %ju resume, retry %u\n",
                     CHKID_ARG(&io->id), io->vclock.clock, retry);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_write_request__(va_list ap)
{
        int ret;
        const nid_t *writer = va_arg(ap, nid_t *);
        const io_t *io = va_arg(ap, io_t *);
        const buffer_t *buf = va_arg(ap, buffer_t *);
        uint32_t magic = va_arg(ap, uint32_t);
        int retry = va_arg(ap, int);

        va_end(ap);

        DBUG("chunk "CHKID_FORMAT" offset %ju size %u\n",
             CHKID_ARG(&io->id), io->offset, io->size);

        ret = __replica_write_aio(writer, io, buf, magic, retry);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_write_request(const nid_t *writer, const io_t *io,
                                   const buffer_t *buf, uint32_t magic, int retry)
{
        int ret;

        ret = core_request(core_hash(&io->id), -1, "replica_write", __replica_write_request__,
                           writer, io, buf, magic, retry);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_write_wait_request__(va_list ap)
{
        int ret;
        const nid_t *writer = va_arg(ap, nid_t *);
        const io_t *io = va_arg(ap, io_t *);
        int retry = va_arg(ap, int);
        uint32_t magic = va_arg(ap, uint32_t);
        time_t ltime = va_arg(ap, time_t);

        va_end(ap);

        DBUG("chunk "CHKID_FORMAT" offset %ju size %u\n",
             CHKID_ARG(&io->id), io->offset, io->size);

        ret = __replica_write_wait(writer, io, magic, retry, ltime);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __replica_write_wait_request(const nid_t *writer, const io_t *io, int retry,
                                uint32_t magic, time_t ltime)
{
        int ret;

        ret = core_request(core_hash(&io->id), -1, "replica_write_wait", __replica_write_wait_request__,
                           writer, io, retry, magic, ltime);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int replica_srv_write(const nid_t *writer, const io_t *io, const buffer_t *buf, uint32_t magic)
{
        int ret, retry = 0;
        time_t ltime;

        /*if (likely(io->id.type == __RAW_CHUNK__)) {
                if (core_self()->hash != core_hash(&io->id)) {
                        DWARN("cross core write "CHKID_FORMAT"\n", CHKID_ARG(&io->id));
                }
        } */

        ANALYSIS_BEGIN(0);

        ret = network_connect(writer, &ltime, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if ENABLE_CHUNK_DEBUG
        DINFO("chunk "CHKID_FORMAT" offset %ju size %u, clock %ju\n",
             CHKID_ARG(&io->id), io->offset, io->size, io->vclock.clock);
#else
        DBUG("chunk "CHKID_FORMAT" offset %ju size %u, clock %ju\n",
             CHKID_ARG(&io->id), io->offset, io->size, io->vclock.clock);
#endif
        
retry:
        if (likely(schedule_running() && io->id.type == __RAW_CHUNK__)) {
                ret = __replica_write_aio(writer, io, buf, magic, 0);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                // 保持同一个chunk上的io更新序
                                // 如果其前面的io还没有完成，加入wlist等待
                                ret = __replica_write_wait(writer, io, retry, magic, ltime);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                retry++;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }

                ANALYSIS_QUEUE(0, IO_WARN, "replica_srv_write");
        } else {
                ret = __replica_write_request(writer, io, buf, magic, 0);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                ret = __replica_write_wait_request(writer, io, retry, magic, ltime);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                retry++;
                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }

                ANALYSIS_QUEUE(0, IO_WARN, "replica_srv_writemd");
        }

        return 0;
err_ret:
        ret = (ret == ENODEV) ? EAGAIN : ret;
        return ret;
}
